Variational Inference with Implicit Approximate Inference Models (WIP Pt. 8)
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
import numpy as np
import keras.backend as K
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import logistic, multivariate_normal, norm
from scipy.special import expit
from keras.models import Model, Sequential
from keras.layers import Activation, Dense, Dot, Input
from keras.optimizers import Adam
from keras.utils.vis_utils import model_to_dot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, SVG, display_html
from tqdm import tnrange, tqdm_notebook
# display animation inline
plt.rc('animation', html='html5')
plt.style.use('seaborn-notebook')
sns.set_context('notebook')
np.set_printoptions(precision=2,
edgeitems=3,
linewidth=80,
suppress=True)
K.tf.__version__
LATENT_DIM = 2
NOISE_DIM = 3
BATCH_SIZE = 200
PRIOR_VARIANCE = 2.
LEARNING_RATE = 3e-3
PRETRAIN_EPOCHS = 60
Bayesian Logistic Regression (Synthetic Data)¶
w_min, w_max = -5, 5
w1, w2 = np.mgrid[w_min:w_max:300j, w_min:w_max:300j]
w_grid = np.dstack((w1, w2))
w_grid.shape
prior = multivariate_normal(mean=np.zeros(LATENT_DIM),
cov=PRIOR_VARIANCE)
log_prior = prior.logpdf(w_grid)
log_prior.shape
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, log_prior, cmap='magma')
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
x1 = np.array([ 1.5, 1.])
x2 = np.array([-1.5, 1.])
x3 = np.array([ .5, -1.])
X = np.vstack((x1, x2, x3))
X.shape
y1 = 1
y2 = 1
y3 = 0
y = np.stack((y1, y2, y3))
y.shape
def log_likelihood(w, x, y):
# equiv. to negative binary cross entropy
return np.log(expit(np.dot(w.T, x)*(-1)**(1-y)))
llhs = log_likelihood(w_grid.T, X.T, y)
llhs.shape
fig, axes = plt.subplots(ncols=3, nrows=1, figsize=(6, 2))
fig.tight_layout()
for i, ax in enumerate(axes):
ax.contourf(w1, w2, llhs[::,::,i], cmap=plt.cm.magma)
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
ax.set_title('$p(y_{{{0}}} \mid x_{{{0}}}, w)$'.format(i+1))
ax.set_xlabel('$w_1$')
if not i:
ax.set_ylabel('$w_2$')
plt.show()
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, np.sum(llhs, axis=2),
cmap=plt.cm.magma)
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2,
np.exp(log_prior+np.sum(llhs, axis=2)),
cmap='magma')
ax.scatter(*X.T, c=y, cmap='coolwarm', marker=',')
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
Model Definitions¶
Density Ratio Estimator (Discriminator) Model¶
$T_{\psi}(x, z)$
Here we consider
$T_{\psi}(w)$
$T_{\psi} : \mathbb{R}^2 \to \mathbb{R}$
discriminator = Sequential(name='discriminator')
discriminator.add(Dense(10, input_dim=LATENT_DIM, activation='relu'))
discriminator.add(Dense(20, activation='relu'))
discriminator.add(Dense(1, activation=None, name='logit'))
discriminator.add(Activation('sigmoid'))
discriminator.compile(optimizer=Adam(lr=LEARNING_RATE),
loss='binary_crossentropy',
metrics=['binary_accuracy'])
ratio_estimator = Model(
inputs=discriminator.inputs,
outputs=discriminator.get_layer(name='logit').output)
SVG(model_to_dot(discriminator, show_shapes=True)
.create(prog='dot', format='svg'))
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
Initial density ratio, prior to any training
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, w_grid_ratio, cmap=plt.cm.magma)
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
discriminator.evaluate(prior.rvs(size=5), np.zeros(5))
Approximate Inference Model¶
$z_{\phi}(x, \epsilon)$
Here we only consider
$z_{\phi}(\epsilon)$
$z_{\phi}: \mathbb{R}^3 \to \mathbb{R}^2$
inference = Sequential()
inference.add(Dense(10, input_dim=NOISE_DIM, activation='relu'))
inference.add(Dense(20, activation='relu'))
inference.add(Dense(LATENT_DIM, activation=None))
inference.summary()
The variational parameters $\phi$ are the trainable weights of the approximate inference model
phi = inference.trainable_weights
phi
SVG(model_to_dot(inference, show_shapes=True)
.create(prog='dot', format='svg'))
w_sample_prior = prior.rvs(size=BATCH_SIZE)
w_sample_prior.shape
eps = np.random.randn(BATCH_SIZE, NOISE_DIM)
w_sample_posterior = inference.predict(eps)
w_sample_posterior.shape
inputs = np.vstack((w_sample_prior, w_sample_posterior))
targets = np.hstack((np.zeros(BATCH_SIZE), np.ones(BATCH_SIZE)))
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2,
np.exp(log_prior+np.sum(llhs, axis=2)),
cmap=plt.cm.magma)
ax.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
metrics = discriminator.evaluate(inputs, targets)
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
metrics
metrics_dict = dict(zip(discriminator.metrics_names, metrics))
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(9, 4))
metrics_plots = {k:ax1.plot([], label=k)[0]
for k in ['loss']} # discriminator.metrics_names}
ax1.set_xlabel('epoch')
ax1.legend(loc='upper left')
ax2.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax2.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax2.set_xlabel('$w_1$')
ax2.set_ylabel('$w_2$')
ax2.set_xlim(w_min, w_max)
ax2.set_ylim(w_min, w_max)
plt.show()
Discriminator pre-training¶
def train_animate(epoch_num, prog_bar, batch_size=200, steps_per_epoch=15):
# Single training epoch
for step in tnrange(steps_per_epoch, unit='step', leave=False):
w_sample_prior = prior.rvs(size=batch_size)
eps = np.random.randn(batch_size, NOISE_DIM)
w_sample_posterior = inference.predict(eps)
inputs = np.vstack((w_sample_prior, w_sample_posterior))
targets = np.hstack((np.zeros(batch_size), np.ones(batch_size)))
metrics = discriminator.train_on_batch(inputs, targets)
# Plot Metrics
metrics_dict = dict(zip(discriminator.metrics_names, metrics))
for metric in metrics_plots:
metrics_plots[metric].set_xdata(np.append(metrics_plots[metric].get_xdata(),
epoch_num))
metrics_plots[metric].set_ydata(np.append(metrics_plots[metric].get_ydata(),
metrics_dict[metric]))
metrics_plots[metric].set_label('{} ({:.2f})' \
.format(metric,
metrics_dict[metric]))
ax1.set_xlabel('epoch {:2d}'.format(epoch_num))
ax1.legend(loc='upper left')
ax1.relim()
ax1.autoscale_view()
# Contour Plot
ax2.cla()
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
ax2.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax2.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax2.set_xlabel('$w_1$')
ax2.set_ylabel('$w_2$')
ax2.set_xlim(w_min, w_max)
ax2.set_ylim(w_min, w_max)
# Progress Bar Updates
prog_bar.update()
prog_bar.set_postfix(**metrics_dict)
return list(metrics_plots.values())
# main training loop is managed by higher-order
# FuncAnimation which makes calls to an `animate`
# function that encapsulates the logic of single
# training epoch. Has benefit of producing
# animation but can incur significant overhead
with tqdm_notebook(total=PRETRAIN_EPOCHS,
unit='epoch', leave=True) as prog_bar:
anim = FuncAnimation(fig,
train_animate,
fargs=(prog_bar,),
frames=PRETRAIN_EPOCHS,
interval=200, # 5 fps
blit=True)
anim_html5_video = anim.to_html5_video()
HTML(anim_html5_video)
inputs = np.vstack((w_sample_prior, w_sample_posterior))
targets = np.hstack((np.zeros(BATCH_SIZE), np.ones(BATCH_SIZE)))
metrics = discriminator.evaluate(inputs, targets)
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
metrics_dict = dict(zip(discriminator.metrics_names, metrics))
props = dict(boxstyle='round', facecolor='w', alpha=0.5)
ax.text(0.05, 0.05,
('accuracy: {binary_accuracy:.2f}\n'
'loss: {loss:.2f}').format(**metrics_dict),
transform=ax.transAxes, bbox=props)
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
Evidence lower bound¶
def set_trainable(model, trainable):
"""inorder traversal"""
model.trainable = trainable
if isinstance(model, Model): # i.e. has layers
for layer in model.layers:
set_trainable(layer, trainable)
y_pred = K.sigmoid(K.dot(
K.constant(w_grid),
K.transpose(K.constant(X))))
y_pred
y_true = K.ones((300, 300, 1))*K.constant(y)
y_true
llhs_keras = - K.binary_crossentropy(
y_pred,
y_true,
from_logits=False)
sess = K.get_session()
np.allclose(np.sum(llhs, axis=-1),
sess.run(K.sum(llhs_keras, axis=-1)))
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, sess.run(K.sum(llhs_keras, axis=-1)),
cmap=plt.cm.magma)
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
def make_elbo(ratio_estimator):
set_trainable(ratio_estimator, False)
def elbo(y_true, w_sample):
kl_estimate = ratio_estimator(w_sample)
y_pred = K.dot(w_sample, K.transpose(K.constant(X)))
log_likelihood = - K.binary_crossentropy(y_pred, y_true,
from_logits=True)
return K.mean(log_likelihood-kl_estimate, axis=-1)
return elbo
elbo = make_elbo(ratio_estimator)
fig, ax = plt.subplots(figsize=(5, 5))
ax.contourf(w1, w2, sess.run(elbo(y_true, K.constant(w_grid))),
cmap=plt.cm.magma)
ax.set_xlabel('$w_1$')
ax.set_ylabel('$w_2$')
ax.set_xlim(w_min, w_max)
ax.set_ylim(w_min, w_max)
plt.show()
inference_loss = lambda y_true, w_sample: -make_elbo(ratio_estimator)(y_true, w_sample)
inference.compile(loss=inference_loss,
optimizer=Adam(lr=LEARNING_RATE))
eps = np.random.randn(BATCH_SIZE, NOISE_DIM)
y_true = K.repeat_elements(K.expand_dims(K.constant(y), axis=0),
axis=0, rep=BATCH_SIZE)
y_true
sess.run(K.mean(elbo(y_true, inference(K.constant(eps))), axis=-1))
inference.evaluate(eps, np.tile(y, reps=(BATCH_SIZE, 1)))
Adversarial Training¶
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(9, 4))
global_epoch = 0
loss_plot_inference, = ax1.plot([], label='inference')
loss_plot_discrim, = ax1.plot([], label='discriminator')
ax1.set_xlabel('epoch')
ax1.set_ylabel('loss')
ax1.legend(loc='upper left')
ax2.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax2.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax2.set_xlabel('$w_1$')
ax2.set_ylabel('$w_2$')
ax2.set_xlim(w_min, w_max)
ax2.set_ylim(w_min, w_max)
plt.show()
def train_animate(epoch_num, prog_bar, batch_size=200,
steps_per_epoch=15):
global global_epoch, loss_plot_inference, loss_plot_discrim
# Single training epoch
## Ratio estimator training
set_trainable(discriminator, True)
for _ in tnrange(3*50, unit='step', desc='discriminator',
leave=False):
w_sample_prior = prior.rvs(size=BATCH_SIZE)
eps = np.random.randn(BATCH_SIZE, NOISE_DIM)
w_sample_posterior = inference.predict(eps)
inputs = np.vstack((w_sample_prior, w_sample_posterior))
targets = np.hstack((np.zeros(BATCH_SIZE), np.ones(BATCH_SIZE)))
metrics_discrim = discriminator.train_on_batch(inputs, targets)
metrics_dict_discrim = dict(zip(discriminator.metrics_names,
np.atleast_1d(metrics_discrim)))
## Inference model training
set_trainable(ratio_estimator, False)
y_tiled = np.tile(y, reps=(BATCH_SIZE, 1))
for _ in tnrange(1, unit='step', desc='inference', leave=False):
eps = np.random.randn(BATCH_SIZE, NOISE_DIM)
metrics_inference = inference.train_on_batch(eps, y_tiled)
metrics_dict_inference = dict(zip(inference.metrics_names,
np.atleast_1d(metrics_inference)))
global_epoch += 1
# Plot Loss
loss_plot_inference.set_xdata(np.append(loss_plot_inference.get_xdata(),
global_epoch))
loss_plot_inference.set_ydata(np.append(loss_plot_inference.get_ydata(),
metrics_dict_inference['loss']))
loss_plot_inference.set_label('inference ({:.2f})' \
.format(metrics_dict_inference['loss']))
loss_plot_discrim.set_xdata(np.append(loss_plot_discrim.get_xdata(),
global_epoch))
loss_plot_discrim.set_ydata(np.append(loss_plot_discrim.get_ydata(),
metrics_dict_discrim['loss']))
loss_plot_discrim.set_label('discriminator ({:.2f})' \
.format(metrics_dict_discrim['loss']))
ax1.set_xlabel('epoch {:2d}'.format(global_epoch))
ax1.legend(loc='upper left')
ax1.relim()
ax1.autoscale_view()
# Contour Plot
ax2.cla()
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
ax2.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax2.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax2.set_xlabel('$w_1$')
ax2.set_ylabel('$w_2$')
ax2.set_xlim(w_min, w_max)
ax2.set_ylim(w_min, w_max)
# Progress Bar Updates
prog_bar.update()
prog_bar.set_postfix(loss_inference=metrics_dict_inference['loss'],
loss_discriminator=metrics_dict_discrim['loss'])
return loss_plot_inference, loss_plot_discrim
with tqdm_notebook(total=50,
unit='epoch', leave=True) as prog_bar:
anim = FuncAnimation(fig,
train_animate,
fargs=(prog_bar,),
frames=50,
interval=200, # 5 fps
blit=True)
anim_html5_video = anim.to_html5_video()
HTML(anim_html5_video)
with tqdm_notebook(total=50,
unit='epoch', leave=True) as prog_bar:
anim = FuncAnimation(fig,
train_animate,
fargs=(prog_bar,),
frames=50,
interval=200, # 5 fps
blit=True)
anim_html5_video = anim.to_html5_video()
HTML(anim_html5_video)
with tqdm_notebook(total=50,
unit='epoch', leave=True) as prog_bar:
anim = FuncAnimation(fig,
train_animate,
fargs=(prog_bar,),
frames=50,
interval=200, # 5 fps
blit=True)
anim_html5_video = anim.to_html5_video()
HTML(anim_html5_video)
with tqdm_notebook(total=50,
unit='epoch', leave=True) as prog_bar:
anim = FuncAnimation(fig,
train_animate,
fargs=(prog_bar,),
frames=50,
interval=200, # 5 fps
blit=True)
anim_html5_video = anim.to_html5_video()
HTML(anim_html5_video)
Evaluating the model¶
w_sample_prior = prior.rvs(size=128)
eps = np.random.randn(256, NOISE_DIM)
w_sample_posterior = inference.predict(eps)
inputs = np.vstack((w_sample_prior, w_sample_posterior))
targets = np.hstack((np.zeros(128), np.ones(256)))
w_grid_ratio = ratio_estimator.predict(w_grid.reshape(300*300, 2))
w_grid_ratio = w_grid_ratio.reshape(300, 300)
fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(9, 4))
ax1.contourf(w1, w2, w_grid_ratio, cmap='magma')
ax1.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax1.set_xlabel('$w_1$')
ax1.set_xlim(w_min, w_max)
ax1.set_ylim(w_min, w_max)
ax2.contourf(w1, w2, np.sum(llhs, axis=2),
cmap=plt.cm.magma)
ax2.scatter(*inputs.T, c=targets, s=4.**2, alpha=.8, cmap='coolwarm')
ax2.set_xlabel('$w_1$')
ax2.set_ylabel('$w_2$')
ax2.set_xlim(w_min, w_max)
ax2.set_ylim(w_min, w_max)
plt.show()
eps = np.random.randn(5000, NOISE_DIM)
w_sample_posterior = inference.predict(eps)
fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(9, 4))
ax1.contourf(w1, w2,
np.exp(log_prior+np.sum(llhs, axis=2)),
cmap=plt.cm.magma)
ax1.scatter(*inference.predict(eps[::10]).T,
s=4.**2, alpha=.6, cmap='coolwarm_r')
ax1.set_xlabel('$w_1$')
ax1.set_ylabel('$w_2$')
ax1.set_xlim(w_min, w_max)
ax1.set_ylim(w_min, w_max)
sns.kdeplot(*inference.predict(eps).T,
cmap='magma', ax=ax2)
plt.show()